// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//


#include <pollux/dwio/parquet/reader/metadata.h>
#include <pollux/dwio/parquet/thrift/ParquetThriftTypes.h>

namespace kumo::pollux::parquet {
    template<typename T>
    inline const T load(const char *ptr) {
        T ret;
        std::memcpy(&ret, ptr, sizeof(ret));
        return ret;
    }

    template<typename T>
    inline std::optional<T> getMin(const thrift::Statistics &columnChunkStats) {
        return columnChunkStats.__isset.min_value
                   ? load<T>(columnChunkStats.min_value.data())
                   : (columnChunkStats.__isset.min
                          ? std::optional<T>(load<T>(columnChunkStats.min.data()))
                          : std::nullopt);
    }

    template<typename T>
    inline std::optional<T> getMax(const thrift::Statistics &columnChunkStats) {
        return columnChunkStats.__isset.max_value
                   ? std::optional<T>(load<T>(columnChunkStats.max_value.data()))
                   : (columnChunkStats.__isset.max
                          ? std::optional<T>(load<T>(columnChunkStats.max.data()))
                          : std::nullopt);
    }

    template<>
    inline std::optional<std::string> getMin(
        const thrift::Statistics &columnChunkStats) {
        return columnChunkStats.__isset.min_value
                   ? std::optional(columnChunkStats.min_value)
                   : (columnChunkStats.__isset.min
                          ? std::optional(columnChunkStats.min)
                          : std::nullopt);
    }

    template<>
    inline std::optional<std::string> getMax(
        const thrift::Statistics &columnChunkStats) {
        return columnChunkStats.__isset.max_value
                   ? std::optional(columnChunkStats.max_value)
                   : (columnChunkStats.__isset.max
                          ? std::optional(columnChunkStats.max)
                          : std::nullopt);
    }

    std::unique_ptr<dwio::common::ColumnStatistics> buildColumnStatisticsFromThrift(
        const thrift::Statistics &columnChunkStats,
        const pollux::Type &type,
        uint64_t numRowsInRowGroup) {
        std::optional<uint64_t> nullCount = columnChunkStats.__isset.null_count
                                                ? std::optional<uint64_t>(columnChunkStats.null_count)
                                                : std::nullopt;
        std::optional<uint64_t> valueCount = nullCount.has_value()
                                                 ? std::optional<uint64_t>(numRowsInRowGroup - nullCount.value())
                                                 : std::nullopt;
        std::optional<bool> hasNull = columnChunkStats.__isset.null_count
                                          ? std::optional<bool>(columnChunkStats.null_count > 0)
                                          : std::nullopt;

        switch (type.kind()) {
            case TypeKind::BOOLEAN:
                return std::make_unique<dwio::common::BooleanColumnStatistics>(
                    valueCount, hasNull, std::nullopt, std::nullopt, std::nullopt);
            case TypeKind::TINYINT:
                return std::make_unique<dwio::common::IntegerColumnStatistics>(
                    valueCount,
                    hasNull,
                    std::nullopt,
                    std::nullopt,
                    getMin<int8_t>(columnChunkStats),
                    getMax<int8_t>(columnChunkStats),
                    std::nullopt);
            case TypeKind::SMALLINT:
                return std::make_unique<dwio::common::IntegerColumnStatistics>(
                    valueCount,
                    hasNull,
                    std::nullopt,
                    std::nullopt,
                    getMin<int16_t>(columnChunkStats),
                    getMax<int16_t>(columnChunkStats),
                    std::nullopt);
            case TypeKind::INTEGER:
                return std::make_unique<dwio::common::IntegerColumnStatistics>(
                    valueCount,
                    hasNull,
                    std::nullopt,
                    std::nullopt,
                    getMin<int32_t>(columnChunkStats),
                    getMax<int32_t>(columnChunkStats),
                    std::nullopt);
            case TypeKind::BIGINT:
                return std::make_unique<dwio::common::IntegerColumnStatistics>(
                    valueCount,
                    hasNull,
                    std::nullopt,
                    std::nullopt,
                    getMin<int64_t>(columnChunkStats),
                    getMax<int64_t>(columnChunkStats),
                    std::nullopt);
            case TypeKind::REAL:
                return std::make_unique<dwio::common::DoubleColumnStatistics>(
                    valueCount,
                    hasNull,
                    std::nullopt,
                    std::nullopt,
                    getMin<float>(columnChunkStats),
                    getMax<float>(columnChunkStats),
                    std::nullopt);
            case TypeKind::DOUBLE:
                return std::make_unique<dwio::common::DoubleColumnStatistics>(
                    valueCount,
                    hasNull,
                    std::nullopt,
                    std::nullopt,
                    getMin<double>(columnChunkStats),
                    getMax<double>(columnChunkStats),
                    std::nullopt);
            case TypeKind::VARCHAR:
            case TypeKind::VARBINARY:
                return std::make_unique<dwio::common::StringColumnStatistics>(
                    valueCount,
                    hasNull,
                    std::nullopt,
                    std::nullopt,
                    getMin<std::string>(columnChunkStats),
                    getMax<std::string>(columnChunkStats),
                    std::nullopt);

            default:
                return std::make_unique<dwio::common::ColumnStatistics>(
                    valueCount, hasNull, std::nullopt, std::nullopt);
        }
    }

    common::CompressionKind thriftCodecToCompressionKind(
        thrift::CompressionCodec::type codec) {
        switch (codec) {
            case thrift::CompressionCodec::UNCOMPRESSED:
                return common::CompressionKind::CompressionKind_NONE;
                break;
            case thrift::CompressionCodec::SNAPPY:
                return common::CompressionKind::CompressionKind_SNAPPY;
                break;
            case thrift::CompressionCodec::GZIP:
                return common::CompressionKind::CompressionKind_GZIP;
                break;
            case thrift::CompressionCodec::LZO:
                return common::CompressionKind::CompressionKind_LZO;
                break;
            case thrift::CompressionCodec::LZ4:
                return common::CompressionKind::CompressionKind_LZ4;
                break;
            case thrift::CompressionCodec::ZSTD:
                return common::CompressionKind::CompressionKind_ZSTD;
                break;
            case thrift::CompressionCodec::LZ4_RAW:
                return common::CompressionKind::CompressionKind_LZ4;
            default:
                POLLUX_UNSUPPORTED(
                    "Unsupported compression type: " +
                    kumo::pollux::parquet::thrift::to_string(codec));
                break;
        }
    }

    ColumnChunkMetaDataPtr::ColumnChunkMetaDataPtr(const void *metadata)
        : ptr_(metadata) {
    }

    ColumnChunkMetaDataPtr::~ColumnChunkMetaDataPtr() = default;

    MELON_ALWAYS_INLINE const thrift::ColumnChunk *thriftColumnChunkPtr(
        const void *metadata) {
        return reinterpret_cast<const thrift::ColumnChunk *>(metadata);
    }

    int64_t ColumnChunkMetaDataPtr::numValues() const {
        return thriftColumnChunkPtr(ptr_)->meta_data.num_values;
    }

    bool ColumnChunkMetaDataPtr::hasMetadata() const {
        return thriftColumnChunkPtr(ptr_)->__isset.meta_data;
    }

    bool ColumnChunkMetaDataPtr::hasStatistics() const {
        return hasMetadata() &&
               thriftColumnChunkPtr(ptr_)->meta_data.__isset.statistics;
    }

    bool ColumnChunkMetaDataPtr::hasDictionaryPageOffset() const {
        return hasMetadata() &&
               thriftColumnChunkPtr(ptr_)->meta_data.__isset.dictionary_page_offset;
    }

    std::unique_ptr<dwio::common::ColumnStatistics>
    ColumnChunkMetaDataPtr::getColumnStatistics(
        const TypePtr type,
        int64_t numRows) {
        POLLUX_CHECK(hasStatistics());
        return buildColumnStatisticsFromThrift(
            thriftColumnChunkPtr(ptr_)->meta_data.statistics, *type, numRows);
    };

    std::string ColumnChunkMetaDataPtr::getColumnMetadataStatsMinValue() {
        POLLUX_CHECK(hasStatistics());
        return thriftColumnChunkPtr(ptr_)->meta_data.statistics.min_value;
    }

    std::string ColumnChunkMetaDataPtr::getColumnMetadataStatsMaxValue() {
        POLLUX_CHECK(hasStatistics());
        return thriftColumnChunkPtr(ptr_)->meta_data.statistics.max_value;
    }

    int64_t ColumnChunkMetaDataPtr::getColumnMetadataStatsNullCount() {
        POLLUX_CHECK(hasStatistics());
        return thriftColumnChunkPtr(ptr_)->meta_data.statistics.null_count;
    }

    int64_t ColumnChunkMetaDataPtr::dataPageOffset() const {
        return thriftColumnChunkPtr(ptr_)->meta_data.data_page_offset;
    }

    int64_t ColumnChunkMetaDataPtr::dictionaryPageOffset() const {
        POLLUX_CHECK(hasDictionaryPageOffset());
        return thriftColumnChunkPtr(ptr_)->meta_data.dictionary_page_offset;
    }

    common::CompressionKind ColumnChunkMetaDataPtr::compression() const {
        return thriftCodecToCompressionKind(
            thriftColumnChunkPtr(ptr_)->meta_data.codec);
    }

    int64_t ColumnChunkMetaDataPtr::totalCompressedSize() const {
        return thriftColumnChunkPtr(ptr_)->meta_data.total_compressed_size;
    }

    int64_t ColumnChunkMetaDataPtr::totalUncompressedSize() const {
        return thriftColumnChunkPtr(ptr_)->meta_data.total_uncompressed_size;
    }

    MELON_ALWAYS_INLINE const thrift::RowGroup *thriftRowGroupPtr(
        const void *metadata) {
        return reinterpret_cast<const thrift::RowGroup *>(metadata);
    }

    RowGroupMetaDataPtr::RowGroupMetaDataPtr(const void *metadata)
        : ptr_(metadata) {
    }

    RowGroupMetaDataPtr::~RowGroupMetaDataPtr() = default;

    int RowGroupMetaDataPtr::numColumns() const {
        return thriftRowGroupPtr(ptr_)->columns.size();
    }

    int64_t RowGroupMetaDataPtr::numRows() const {
        return thriftRowGroupPtr(ptr_)->num_rows;
    }

    int64_t RowGroupMetaDataPtr::totalByteSize() const {
        return thriftRowGroupPtr(ptr_)->total_byte_size;
    }

    bool RowGroupMetaDataPtr::hasFileOffset() const {
        return thriftRowGroupPtr(ptr_)->__isset.file_offset;
    }

    int64_t RowGroupMetaDataPtr::fileOffset() const {
        return thriftRowGroupPtr(ptr_)->file_offset;
    }

    bool RowGroupMetaDataPtr::hasTotalCompressedSize() const {
        return thriftRowGroupPtr(ptr_)->__isset.total_compressed_size;
    }

    int64_t RowGroupMetaDataPtr::totalCompressedSize() const {
        return thriftRowGroupPtr(ptr_)->total_compressed_size;
    }

    ColumnChunkMetaDataPtr RowGroupMetaDataPtr::columnChunk(int i) const {
        return ColumnChunkMetaDataPtr(
            reinterpret_cast<const void *>(&thriftRowGroupPtr(ptr_)->columns[i]));
    }

    MELON_ALWAYS_INLINE const thrift::FileMetaData *thriftFileMetaDataPtr(
        const void *metadata) {
        return reinterpret_cast<const thrift::FileMetaData *>(metadata);
    }

    FileMetaDataPtr::FileMetaDataPtr(const void *metadata) : ptr_(metadata) {
    }

    FileMetaDataPtr::~FileMetaDataPtr() = default;

    RowGroupMetaDataPtr FileMetaDataPtr::rowGroup(int i) const {
        return RowGroupMetaDataPtr(reinterpret_cast<const void *>(
            &thriftFileMetaDataPtr(ptr_)->row_groups[i]));
    }

    int64_t FileMetaDataPtr::numRows() const {
        return thriftFileMetaDataPtr(ptr_)->num_rows;
    }

    int FileMetaDataPtr::numRowGroups() const {
        return thriftFileMetaDataPtr(ptr_)->row_groups.size();
    }
} // namespace kumo::pollux::parquet
